Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added inter-task communication channels #12264

Merged
merged 1 commit into from
Jul 28, 2015
Merged

Conversation

amitmurthy
Copy link
Contributor

This PR implements only the inter-task (not inter-process) channels part of #12042

The default channel (of size 32) is faster than produce-consume as seen below.

julia> function foo(t::Task)
           while true
               consume(t)
           end
       end;

julia> function foo(n::Int)
           self = current_task()
           @schedule foo(self)
           for i in 1:10^n
               produce(i)
           end
       end;

julia> foo(1);

julia> @time foo(6);
   2.226 seconds      (2005 k allocations: 31484 KB, 0.17% gc time)

julia> function bar(c::Channel)
           while true
               take!(c)
           end
       end;

julia> function bar(n, c)
           @schedule bar(c)
           for i in 1:10^n
               put!(c, i)
           end
       end;

julia> bar(1, Channel())

julia> @time bar(6, Channel())
 104.415 milliseconds (1125 k allocations: 21483 KB, 2.48% gc time)                                                                                                                      

julia> bar(1, Channel(1))

julia> @time bar(6, Channel(1))
   2.287 seconds      (5000 k allocations: 198 MB, 0.74% gc time)                                                                                                                        ```

@amitmurthy
Copy link
Contributor Author

Any objections to merging this? Both the feature and code is pretty straightforward and doesn't break any existing stuff.

I'll rebase #12042 if/when this gets merged. That will make is easier to review #12042 and we can take a call if we want to go head with the relatively major change of #12042.

take_pos::Int # read position
put_pos::Int # write position

function Channel(sz)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should throw an error rather the silently truncate. I feel the user should also be able. To define a Chanel length greater than 32.

I guess you could also throw a nicer error checking if the Chanel size is greater than or equal to 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can define a channel size greater than 32. data is a circular buffer that is managed via szp1, take_pos and put_pos. The initial allocation is 32, It is grown on demand to a maximum size of sz_max. Even a channel of typemax(Int) will only allocate storage for 32 objects initially.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A further optimization that should be done is halving data once the current number of objects is less than 50% of data length

@amitmurthy amitmurthy force-pushed the amitm/channels_only branch 2 times, most recently from bbbd932 to 067746d Compare July 26, 2015 06:38
@amitmurthy
Copy link
Contributor Author

I'll merge this in a day or two if there are no further issues.

amitmurthy added a commit that referenced this pull request Jul 28, 2015
Added inter-task communication channels
@amitmurthy amitmurthy merged commit 9738275 into master Jul 28, 2015
@amitmurthy amitmurthy deleted the amitm/channels_only branch July 28, 2015 11:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants